package com.wing.actor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wing.actor.message.DefaultMessage;
import com.wing.actor.message.Message;

/**
 * 默认的Actor管理类
 * @author 杜祥
 * @create 2017年9月11日
 */
public class ActorManager
{
	private static final Logger log = LoggerFactory.getLogger(ActorManager.class);
	
	
	private final static ActorManager DEFAULT_MANAGER = new ActorManager();
	
	
	static 
	{
		String str = System.getProperty("game.wing.actor.maxThreads");
		
		int maxThreads = Runtime.getRuntime().availableProcessors();
		
		if(str != null) 
		{
			str = str.trim();
			
			int size = Integer.parseInt(str);
			
			if(size > 0) 
			{
				maxThreads = size;
			}
		}
		
		DEFAULT_MANAGER.initialize(maxThreads);
		
		log.debug("初始化Actor管理类，maxThreads=" + maxThreads + " -Dgame.wing.actor.maxThreads:" + str);
	}
	
	
	public static ActorManager getDefaultInstance() 
	{
		return DEFAULT_MANAGER;
	}
	
	
	
	/**
	 * 所有的Actors
	 */
	private final ConcurrentMap<String, AbstractActor> allActors = new ConcurrentHashMap<String, AbstractActor>();
	
	
	
	/**
	 * 调度器的控制线程
	 */
	private final List<Thread> dispatcherThreads = new ArrayList<Thread>();
	
	
	/**
	 * 调度器列表
	 */
	private final List<ActorWorkerDispatcher> dispatchers = new ArrayList<ActorWorkerDispatcher>();
	
	
	/**
	 * 调度器分配器
	 */
	private final AtomicInteger distributor = new AtomicInteger(0);
	
	
	/**
	 * 是否在运行中
	 */
	private final AtomicBoolean running = new AtomicBoolean(false);
	
	
	
	private ActorManager() 
	{
		
	}
	
	
	
	/**
	 * 创建Actor
	 * @param actorClass				Actor的类信托
	 * @param name						Actor的名称
	 * @return
	 */
	public AbstractActor createActor(Class<? extends AbstractActor> actorClass, String name)
	{
		if(!running.get())
			throw new RuntimeException("Actor manger is not running.");
		
		AbstractActor actor = null;
		try
		{
			actor = actorClass.newInstance();
			
			actor.setName(name);
			
			if(allActors.putIfAbsent(name, actor) != null)
			{
				throw new RuntimeException("Actor的名称重复,name:" + name);
			}
		}
		catch (InstantiationException | IllegalAccessException e)
		{
			log.error("创建Actor["+ actorClass.getName() +"]出错", e);
		}
		return actor;
	}

	
	/**
	 * 创建并开启Actor
	 * @param actorClass				Actor的类信托
	 * @param name						名称
	 * @return
	 */
	public AbstractActor createAndStartActor(Class<? extends AbstractActor> actorClass, String name)
	{
		if(!running.get())
			throw new RuntimeException("Actor manger is not running.");
		
		AbstractActor actor = createActor(actorClass, name);
		
		if(actor == null) 
		{
			throw new NullPointerException("创建Actor失败。");
		}
		
		startActor(actor);
		
		return actor;
	}

	
	/**
	 * 开始一个Actor
	 * @param actor						需要开始的Actor
	 * @return
	 */
	public boolean startActor(AbstractActor actor)
	{
		if(!running.get())
			throw new RuntimeException("Actor manger is not running.");
		
		if(!allActors.containsKey(actor.getName())) 
		{
			return false;
		}
		
		ActorWorkerDispatcher dispatcher = getDispatcher();
		
		actor.setActive(true);
		
		actor.activate();
		
		actor.registerDispatcher(dispatcher);
		
		return false;
	}

	
	/**
	 * 关闭指定的Actor
	 * @param actor
	 * @return
	 */
	public boolean closeActor(AbstractActor actor)
	{
		if(!running.get())
			throw new RuntimeException("Actor manger is not running.");
		
		if(!allActors.containsKey(actor.getName())) 
		{
			return false;
		}
		
		allActors.remove(actor.getName());
		
		actor.setActive(false);
		
		actor.clearMailbox();
		
		actor.deactivate();
		
		return true;
	}

	
	/**
	 * 向指定的Actor发送消息
	 * @param sender						消息发送者
	 * @param receiver						消息接收者
	 * @param message						消息
	 * @return								返回0则没有发送成功，返回1则成功
	 */
	public int sendMessage(AbstractActor sender, AbstractActor receiver, Message message)
	{
		if(receiver == null)
			throw new NullPointerException("receiver is null.");
		
		if(receiver.hasActive()) 
		{
			receiver.sendMessage(message);
			return 1;
		}
		return 0;
	}
	
	
	
	/**
	 * 向指定的Actor发送消息
	 * @param sender						消息发送者
	 * @param receiver						消息接收者
	 * @param data							消息内容
	 * @return								返回0则没有发送成功，返回1则成功
	 */
	public int sendMessage(AbstractActor sender, AbstractActor receiver, Object data)
	{
		if(receiver == null)
			throw new NullPointerException("receiver is null.");
		
		if(receiver.hasActive()) 
		{
			receiver.sendMessage(new DefaultMessage(sender, data));
			
			return 1;
		}
		
		return 0;
	}
	

	
	/**
	 * 向指定名称的接收者发送消息
	 * @param sender						发送者
	 * @param receiverName					接收者的名称
	 * @param message						消息
	 * @return								返回0则没有发送成功，返回1则成功
	 */
	public int sendMessage(AbstractActor sender, String receiverName, Message message)
	{
		AbstractActor receiver = allActors.get(receiverName);
		
		if(receiver != null && receiver.hasActive()) 
		{
			if(receiver.hasActive())
			{
				receiver.sendMessage(message);
				
				return 1;
			}
		}
		return 0;
	}

	
	/**
	 * 将消息发送给多个接收者
	 * @param sender						发送者
	 * @param receivers						接收者数组
	 * @param message						消息
	 * @return								返回0列表中没有发送者，>=1表示发送成功的数量
	 */
	public int sendMessage(AbstractActor sender, AbstractActor[] receivers, Message message)
	{
		if(receivers == null)
			throw new NullPointerException("receivers is null");
		
		int sendCount = 0;
		AbstractActor receiver = null;
		for(int i = 0; i < receivers.length; i++) 
		{
			receiver = receivers[i];
			
			if(receiver == null)
				continue;
			
			if(!receiver.hasActive())
				continue;
			
			receiver.sendMessage(message);
			
			sendCount += 1;
		}
		return sendCount;
	}


	/**
	 * 将消息发送给多个接收者
	 * @param sender						发送者
	 * @param receivers						接收者列表
	 * @param message						消息
	 * @return								返回0列表中没有发送者，>=1表示发送成功的数量
	 */
	public int sendMessage(AbstractActor sender, Collection<? extends AbstractActor> receivers, Message message)
	{
		if(receivers == null)
			throw new NullPointerException("receivers is null");
		
		int sendCount = 0;
		for(AbstractActor receiver : receivers) 
		{
			if(receiver == null)
				continue;
			
			if(!receiver.hasActive())
				continue;
			
			receiver.sendMessage(message);
			
			sendCount += 1;
		}
		return sendCount;
	}

	
	/**
	 * 将消息发送给指定的组			
	 * @param sender						发送者
	 * @param groupId						组ID
	 * @param message						消息
	 * @return
	 */
	public int sendGroupMessage(AbstractActor sender, String groupId, Message message)
	{
		if(groupId == null || "".equals(groupId))
			throw new NullPointerException("groupId is null");
			
		List<AbstractActor> actors = getActorByGroupId(groupId);
		
		if(actors.isEmpty())
			return 0;
		
		int sendCount = 0;
		for(AbstractActor receiver : actors) 
		{
			if(!receiver.hasActive())
				continue;
			
			receiver.sendMessage(message);
			
			sendCount += 1;
		}
		return sendCount;
	}

	
	/**
	 * 将消息发送给指定类型的所有Actor
	 * @param sender
	 * @param type
	 * @param message
	 * @return
	 */
	public int sendTypeMessage(AbstractActor sender, Class<? extends AbstractActor> type, Message message)
	{
		if(type == null)
			throw new NullPointerException("type is null");
			
		List<AbstractActor> actors = getActorsByType(type);
		
		if(actors.isEmpty())
			return 0;
		
		int sendCount = 0;
		for(AbstractActor receiver : actors) 
		{
			if(!receiver.hasActive())
				continue;
			
			receiver.sendMessage(message);
			
			sendCount += 1;
		}
		return sendCount;
	}

	
	/**
	 * 将指定的消息广播给所有的Actor
	 * @param sender
	 * @param message
	 * @return
	 */
	public int broadcast(AbstractActor sender, Message message)
	{
		int sendCount = 0;
		
		for(AbstractActor receiver : allActors.values()) 
		{
			if(!receiver.hasActive())
				continue;
			
			receiver.sendMessage(message);
			
			sendCount ++;
		}
		return sendCount;
	}
	
	
	
	/**
	 * 获得Actor总数
	 * @return
	 */
	public int getActorCount()
	{
		return allActors.size();
	}

	
	/**
	 * 获得指定类型的Actor的总数
	 * @param clazz
	 * @return
	 */
	public int getActorCount(Class<? extends AbstractActor> clazz)
	{
		int count = 0;
		for(Entry<String, AbstractActor> e : allActors.entrySet()) 
		{
			if(e.getValue().getClass().equals(clazz)) 
			{
				count += 1;
			}
		}
		return count;
	}
	
	
	
	/**
	 * 获得指定类型的所有Actor
	 * @param clazz
	 * @return
	 */
	public List<AbstractActor> getActorsByType(Class<? extends AbstractActor> clazz)
	{
		List<AbstractActor> actors = new ArrayList<AbstractActor>();
		for(Entry<String, AbstractActor> e : allActors.entrySet()) 
		{
			if(e.getValue().getClass().equals(clazz)) 
			{
				actors.add(e.getValue());
			}
		}
		return actors;
	}
	
	
	
	
	/**
	 * 获得指定组ID的Actor数量
	 * @param groupId
	 * @return
	 */
	public int getGroupActorCount(String groupId) 
	{
		int count = 0;
		for(Entry<String, AbstractActor> e : allActors.entrySet()) 
		{
			if(e.getValue().getGroup().equals(groupId)) 
			{
				count += 1;
			}
		}
		return count;
	}
	
	
	/**
	 * 获得指定groupId的所有Actor
	 * @param groupId
	 * @return
	 */
	public List<AbstractActor> getActorByGroupId(String groupId)
	{
		List<AbstractActor> list = new ArrayList<AbstractActor>();
		for(Entry<String, AbstractActor> e : allActors.entrySet()) 
		{
			if(e.getValue().getGroup().equals(groupId)) 
			{
				list.add(e.getValue());
			}
		}
		return list;
	}
	
	
	
	
	/**
	 * 初始化该Actor管理类
	 * @param maxThreads			最大线程数
	 */
	public void initialize(int maxThreads) 
	{
		for(int i = 0; i < maxThreads; i++) 
		{
			ActorWorkerDispatcher dispatcher = new ActorWorkerDispatcher(Short.MAX_VALUE, this);
		
			Thread thread = createThread(i + 1, dispatcher);
			
			thread.start();
			
			dispatchers.add(dispatcher);
			
			dispatcherThreads.add(thread);
		}
		
		running.set(true);
	}
	
	
	/**
	 * 创建线程
	 * @param index
	 * @param dispatcher
	 * @return
	 */
	private Thread createThread(int index, ActorWorkerDispatcher dispatcher) 
	{
		Thread thread = new Thread(dispatcher, "actor-dispatcher-" + index);
		
		thread.setPriority(Thread.NORM_PRIORITY);
		
		return thread;
	}
	
	
	
	
	
	/**
	 * 获得一个调度器
	 * @return
	 */
	private ActorWorkerDispatcher getDispatcher() 
	{
		int index = distributor.getAndIncrement();
		
		index = index % dispatchers.size();
		
		ActorWorkerDispatcher dispatcher = dispatchers.get(index);
		
		return dispatcher;
	}
	
	


	/**
	 * 关闭Actor管理类
	 */
	public void shutdown()
	{
		if(running.compareAndSet(true, false)) 
		{
			for(AbstractActor actor : allActors.values())
			{
				closeActor(actor);
			}
			
			allActors.clear();
			
			for(ActorWorkerDispatcher dispatcher : dispatchers) 
			{
				dispatcher.shutdown();
			}
			
			try
			{
				Thread.sleep(50);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			
			for(Thread thread : dispatcherThreads) 
			{
				thread.interrupt();
			}
			
			dispatchers.clear();
			dispatcherThreads.clear();
			
		}
	}
	

}
